Skip to main content
Version: 4.x

RocketMQ Connect Quick Start

Quick Start

This tutorial will start a RocketMQ Connector example project "rocketmq-connect-sample" in standalone mode to help you understand the working principle of connectors. The example project provides a source connector that reads data from source files and sends it to the RocketMQ cluster. It also provides a sink connector that reads messages from the RocketMQ cluster and writes them to destination files.

1. Preparation: Start RocketMQ

  1. Linux/Unix/Mac
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x+;
  4. Start RocketMQ. Either RocketMQ 4.x or RocketMQ 5.x 5.x version can be used;
  5. Test RocketMQ message sending and receiving using the tool.

Here, use the environment variable NAMESRV_ADDR to inform the tool client of the NameServer address of RocketMQ as localhost:9876.

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

Note: RocketMQ has the feature of automatically creating Topic and Group. When sending or subscribing to messages, if the corresponding Topic or Group does not exist, RocketMQ will automatically create them. Therefore, there is no need to create Topic and Group in advance.

2. Build Connector Runtime

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

Note: The project already includes the code for rocketmq-connect-sample by default, so there is no need to build the rocketmq-connect-sample plugin separately.

3. Run Connector Worker in Standalone Mode

Modify Configuration

Modify the connect-standalone.conf file to configure the RocketMQ connection address and other information. Please refer to 9. Configuration File Instructions for details.

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

In standalone mode, RocketMQ Connect persists the synchronization checkpoint information to the local file directory storePathRootDir.

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

If you want to reset the synchronization checkpoint, you need to delete the persisted checkpoint file.

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

Start Connector Worker in Standalone Mode

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

tips: You can modify docker/connect/bin/runconnect.sh to adjust JVM startup parameters as needed.

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

To view the startup log file:

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

If the runtime starts successfully, you will see the following print in the log file:

The standalone worker boot success.

To exit the log tracking mode of tail -f command, you can press the Ctrl + C key combination.

4. Start Source Connector

Create Source File and Write Test Data

mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt

Note: There should be no empty lines (the demo program will throw an error if it encounters empty lines). The source connector will continuously read the source file and convert each line of data into a message body to be sent to RocketMQ for consumption by the sink connector.

Start Source Connector

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'

If the curl request returns status 200, it indicates successful creation. Example response:

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}

View the log file:

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

If you see the following log, it means the file source connector has started successfully:

Start connector fileSourceConnector and set target state STARTED successed!!

Source Connector Configuration Instructions

keynullabledefaultdescription
connector.classfalseThe class name (including the package name) that implements the Connector interface
filenamefalseThe name of the source file (recommended to use absolute path)
connect.topicnamefalseTopic required for synchronizing file data

5. Start sink connector

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'

If the curl request returns status 200, it indicates successful creation. Example response:

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}

View the log file:

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

If you see the following log, it means the file sink connector has started successfully:

Start connector fileSinkConnector and set target state STARTED successed!!

Check if the sink connector has written data to the destination file:

cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

If the test-sink-file.txt file is generated and its content is the same as the test-source-file.txt, it means the entire process is running correctly.

Continue writing test data to the source file test-source-file.txt:

cd /Users/YourUsername/rocketmqconnect/

echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt

# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

Note: The order of file contents may vary because the rocketmq-connect-sample uses normal message when sending and receiving messages to/from a RocketMQ topic. This is different from ordered message, and consuming normal messages does not guarantee the order.

sink connector configuration instructions

keynullabledefaultdescription
connector.classfalseThe class name (including the package name) that implements the Connector interface
filenamefalseThe sink pulls data and saves it to a file(recommended to use absolute path)
connect.topicnamesfalseThe topics of the data messages that the sink needs to process

Tips:The configuration file instructions for the sample rocketmq-connect-sample are for reference only, different source/sink connectors have different configurations, please refer to the specific source/sink connector.

6. Stop connector

The RESTful command format for stopping connectors is http://(your worker ip):(port)/connectors/(connector name)/stop

To stop the two connectors in the demo, you can use the following commands:

curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop

If the curl request returns a status of 200, it indicates successful stopping of the connectors. Example response:

{"status":200,"body":"Connector [fileSinkConnector] deleted successfully"}

If you see the following log message, it means the file sink connector has been successfully shut down:

tail -100f ~/logs/rocketmqconnect/connect_default.log

Completed shutdown for connectorName:fileSinkConnector

7. Stop the Worker process

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh

8. Log directory

You can use the following commands to view the log directory:

ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect

9. Configuration File Instructions

Modify the RESTful port, storeRoot path, Nameserver address, and other information based on your usage.

Here is an example of a configuration file:

#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1

# Http prot for user to access REST API
httpPort=8082

# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876

# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=

Explanation of storePathRootDir configuration:

In standalone mode, RocketMQ Connect persists the synchronization checkpoint information to the local file directory specified by storePathRootDir. The persistent files include:

keydescription
connectorConfig.jsonConnector configuration persistence files
position.jsonSource connect data processing progress persistence files
taskConfig.jsonTask configuration persistence files
offset.jsonSink connect data consumption progress persistence files
connectorStatus.jsonConnector status persistence files
taskStatus.jsonTask status persistence files